﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using OF.DistributeService.Core.Common;
using OF.Notify.DataHost;
using OF.Notify.DataHost.Cluster;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.Entity;
using OF.Notify.Master.Zookeeper;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace OF.Notify.Master
{
    public class TopicClusterMaster
    {
        internal const string LockDeliverMsgKeyFormat = "`LDMK`{0}`{1}";
        internal ClusterMaster clusterMaster = null;
        internal byte topicEnum;

        public ConcurrentDictionary<int, TopicCluster> onLineTopicClusterDict;

        public TopicClusterIdProvider topicClusterIdProvider = null;
        
        public int currentTopicClusterId = 0;

        internal ConsumerInstanceProvider<ConsumerTopicClusterMaster> consumerClusterMasterProvider = null;
        internal List<byte> allConsumerEnums = null;
        public TopicClusterMaster(ClusterMaster clusterMaster, ZooKeeperProxy zk, byte topicEnum)
        {
            this.clusterMaster = clusterMaster;
            this.topicEnum = topicEnum;
			var context = ClusterContext.Get();
			var topicContext = context.GetTopicContext(topicEnum);
            Dictionary<byte, ConsumerTopicClusterMaster> consumerTopicClusterMasterDict = new Dictionary<byte, ConsumerTopicClusterMaster>();
            this.allConsumerEnums = topicContext.GetAllConsumerEnums();
            foreach (var consumerEnum in allConsumerEnums)
            {
                consumerTopicClusterMasterDict.Add(consumerEnum, new ConsumerTopicClusterMaster(topicEnum, consumerEnum, this));
            }
            consumerClusterMasterProvider = new ConsumerInstanceProvider<ConsumerTopicClusterMaster>(consumerTopicClusterMasterDict);
            onLineTopicClusterDict = new ConcurrentDictionary<int, TopicCluster>();
            topicClusterIdProvider = new TopicClusterIdProvider(this, zk, topicEnum);
        }

        public bool IsNodeInActiveCluster(short nodeId)
        {
            foreach (TopicCluster cluster in onLineTopicClusterDict.Values)
            {
                if (!cluster.GetIsDisposed() && cluster.GetClusterNodeIds().Contains(nodeId))
                {
                    return true;
                }
            }
            return false;
        }
		

        public int GetNewClusterId()
        {
            return topicClusterIdProvider.GetNextClusterId();
        }

        public int GetNewCollectionId()
        {
            return topicClusterIdProvider.GetNextDataCollectionId();
        }
        
        public AllNodesRuntimeData GetAllRuntimeData(List<byte> consumerEnumList)
        {
            AllNodesRuntimeData result = new AllNodesRuntimeData();
            result.NodeRuntimeDataList = new List<NodeRuntimeData>();
            ClusterContext.Get().GetNotifyServiceProvider().VisitAllNodes(clusterMaster, topicEnum, result);
            result.ProcessedCollectionsDict = new Dictionary<byte, List<int>>();
            foreach (byte consumerEnum in consumerEnumList)
            {
                result.ProcessedCollectionsDict.Add(consumerEnum, consumerClusterMasterProvider.GetInstance(consumerEnum).GetProcessedData());
            }
            result.MaxCollectionId = topicClusterIdProvider.GetMaxDataCollectionId();
            return result;
        }

        public void MergeProcessTree(short nodeId)
        {
            var context = ClusterContext.Get();
            Dictionary<byte, IdTreeNode> data = null;
            Util.SafeLoopUtilTrue((loopI1) =>
            {
                if (clusterMaster.isDisposed)
                {
                    return true;
                }

                var proxy = clusterMaster.GetProxy(nodeId);
                if (proxy == null)
                {
                    return true;
                }
                var callResult = proxy.GetProcessedTreeForRestore(new GetProcessedTreeForRestoreRequest { topicEnum = this.topicEnum });
                if (callResult.IsSuccess())
                {
                    data = callResult.Data;
                    return true;
                }
                else
                {
                    return false;
                }
            }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());

            if (data == null)
            {
                return;
            }
            Dictionary<byte, IdTreeNode> consumerProcessTreeDict = data;
            foreach (var kv in consumerProcessTreeDict)
            {
                IdTreeNode nodeProcessedTree = kv.Value;
                if (clusterMaster.isDisposed)
                {
                    return;
                }
                this.consumerClusterMasterProvider.GetInstance(kv.Key).MergeNode(nodeProcessedTree);                
            }
            MultiSendGetClusterNodeChangeToNotifyCmd(nodeId);
        }
		
		internal void MultiSendGetClusterNodeChangeToNotifyCmd(short nodeId)
        {
            DataNodeProxy addedNode = null;
            if (!clusterMaster.dataNodeProxyDict.TryGetValue(nodeId, out addedNode))
            {
                return;
            }
            var context = ClusterContext.Get();
            List<DataNodeProxy> onlineNodeList = clusterMaster.dataNodeProxyDict.Values.ToList();
            Parallel.ForEach(onlineNodeList, ClusterContext.GetParallelOptions() , (toNode) => {
                if (toNode.GetId() == nodeId)
                {
                    return;
                }

                Util.SafeLoopUtilTrue((loopI) => {
                    if (clusterMaster.IsNodeOffLine(toNode.GetId()))
                    {
                        return true;
                    }
                    return toNode.SendGetClusterNodeChangeToNotifyCmd(new SendGetClusterNodeChangeToNotifyCmdRequest
                    {
                        topicEnum = topicEnum,
                        nodeId = nodeId
                    }).IsSuccess();
                }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
            });

            Parallel.ForEach(onlineNodeList, ClusterContext.GetParallelOptions(), (toNode) => {
                short toNodeId = toNode.GetId();
                if (toNodeId == nodeId)
                {
                    return;
                }
                Util.SafeLoopUtilTrue((loopI) => {
                    if (clusterMaster.IsNodeOffLine(addedNode.GetId()))
                    {
                        return true;
                    }
                    return addedNode.SendGetClusterNodeChangeToNotifyCmd(new SendGetClusterNodeChangeToNotifyCmdRequest
                    {
                        topicEnum = topicEnum,
                        nodeId = toNodeId
                    }).IsSuccess();
                }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
            });
        }

        public void AssignTopicCluster(List<short> nodeList)
        {
            int clusterId = GetNewClusterId();
            if (clusterMaster.isDisposed || clusterId == ClusterContext.InvalidateId)
            {
                return;
            }
            TopicCluster newCluster = new TopicCluster(clusterId, nodeList, this.topicEnum);
            MultiSendAssignOnLineClusterNodes(nodeList, newCluster.Id);
            if (clusterMaster.isDisposed)
            {
                return;
            }
            onLineTopicClusterDict.TryAdd(newCluster.Id, newCluster);
        }

        internal void MultiSendAssignOnLineClusterNodes(List<short> toNodes, int onLineClusterId)
        {
            SendAssignOnLineClusterNodesRequest request = new SendAssignOnLineClusterNodesRequest
            {
                toNodes = toNodes,
                onLineClusterId = onLineClusterId,
                topicEnum = topicEnum
            };
            Parallel.ForEach(toNodes, ClusterContext.GetParallelOptions(), (toNodeId) => {
                if (clusterMaster.isDisposed)
                {
                    return;
                }
                var node = clusterMaster.GetProxy(toNodeId);
                node.SendAssignOnLineClusterNodes(request);
            });
        }

        public void MultiSendSyncProcessedTree()
        {
            foreach (var consumerEnum in allConsumerEnums)
            {
                if (clusterMaster.isDisposed)
                {
                    return;
                }
                MultiSendSyncProcessedTree(consumerEnum, null);
            }
        }

        public void MultiSendSyncProcessedTree(byte consumerEnum, List<int> toDeleteCollections)
        {
            consumerClusterMasterProvider.GetInstance(consumerEnum).MultiSendSyncProcessedTree(toDeleteCollections);
        }

        public void RemoveOnlineNode(short id)
        {
            bool tmp = false;
            List<TopicCluster> toRemoveCluster = new List<TopicCluster>();
            foreach (var clusterKV in onLineTopicClusterDict)
            {
                if (clusterKV.Value.GetClusterNodeIds().Contains(id))
                {
                    toRemoveCluster.Add(clusterKV.Value);
                }
            }
            lock (this)
            {
                if (toRemoveCluster.Count > 0)
                {
                    foreach (var cluster in toRemoveCluster)
                    {
                        TopicCluster tempCluster = null;
                        if (onLineTopicClusterDict.TryRemove(cluster.Id, out tempCluster))
                        {
                            DisposeClusterOnlineCollections(tempCluster);
                        }
                    }
                }
            }

        }

        internal void DisposeClusterOnlineCollections(TopicCluster cluster)
        {
            if (cluster.GetIsDisposed())
            {
                return;
            }
            KeyValuePair<int, int> collectionKV = cluster.DoDispose();
            int collectionId = collectionKV.Key;
            int lastCollectionId = collectionKV.Value;
            if (collectionId != ClusterContext.InvalidateId || lastCollectionId != ClusterContext.InvalidateId)
            {
                if (clusterMaster.isDisposed)
                {
                    return;
                }
                List<short> clusterNodes = cluster.GetClusterNodeIds();
                Parallel.ForEach(clusterNodes, ClusterContext.GetParallelOptions(), (nodeId) => {
                    if (clusterMaster.isDisposed)
                    {
                        return;
                    }
                    var context = ClusterContext.Get();
                    Util.SafeLoopUtilTrue((loopI1) =>
                    {
                        if (clusterMaster.isDisposed)
                        {
                            return true;
                        }

                        var node = clusterMaster.GetProxy(nodeId);
                        if (node == null)
                        {
                            return true;
                        }
                        var callResult = node.SendDisposeOnlineCollection(new SendDisposeOnlineCollectionRequest
                        {
                            collectionId = collectionId,
                            lastCollectionId = lastCollectionId,
                            topicEnum = topicEnum
                        });
                        return callResult.IsSuccess();
                    }, context.GetMaxApiRetryCount(), context.GetRetryFailSleepMS());
                });
            }
        }

        public string GetLockDeliverClusterKey(int clusterId)
        {
            return string.Format(LockDeliverMsgKeyFormat, (byte)topicEnum, clusterId);
        }


				
		public bool MultiSendAssignOnlineCollection(List<short> clusterNodes, int clusterId, int collectionId)
        {
            var request = new SendAssignOnlineCollectionRequest { 
                topicEnum = topicEnum,
                clusterId = clusterId,
                collectionId = collectionId
            };
            bool hasError = false;
            Parallel.ForEach(clusterNodes, ClusterContext.GetParallelOptions(), (nodeId) =>
            {
                DataNodeProxy node = null;
                if (!clusterMaster.dataNodeProxyDict.TryGetValue(nodeId, out node))
                {
                    hasError = true;
                    return;
                }

                if (hasError)
                {
                    return;
                }

                var callResult = node.SendAssignOnlineCollection(request);
                if (!callResult.IsSuccess())
                {
                    hasError = true;
                    return;
                }
                bool isOk = callResult.Data;
                if (!isOk)
                {
                    hasError = true;
                    return;
                }
            });
            return !hasError;
        }
		
		internal TopicCluster GetNextOnLineCluster(List<TopicCluster> clusterList)
        {
            int selectI = currentTopicClusterId % clusterList.Count;
            currentTopicClusterId = selectI + 1;
            TopicCluster tempCluster = clusterList[selectI];
            return tempCluster;
        }
		
		public CallServiceResult<bool> DeliverMessage(DeliverMessageRequest request)
        {
            TopicMessage message = request.topicMessage;
            if (request.topicMessage.body.Length + 1 > UInt16.MaxValue)
            {
                return CallServiceResult<bool>.GetError(1, "Message length exceed max value " + (UInt16.MaxValue - 1));
            }
            TopicCluster onLineCluster = null;
            int collectionId = -1;
            bool result = false;
            
            var clusterList = onLineTopicClusterDict.Values.ToList();
            if (clusterList.Count <= 0)
            {
                return CallServiceResult<bool>.GetError(2, "No service find to send message.");
            }
            for (int i1 = 0; i1 < clusterList.Count; i1++)
            {
                TopicCluster tempCluster = GetNextOnLineCluster(clusterList);
                if (!tempCluster.GetIsDisposed())
                {
                    onLineCluster = tempCluster;
                    break;
                }
            }

            bool isToSwitch = false;
            Action sendOnCurrentCluster = () =>
            {
                ClusterContext.StringLock.Invoke(GetLockDeliverClusterKey(onLineCluster.Id), () =>
                {
                    if (clusterMaster.isDisposed)
                    {
                        result = false;
                        return;
                    }

                    if (onLineCluster.GetIsDisposed())
                    {
                        result = false;
                        return;
                    }

                    if (onLineCluster.CanDeliverMsg())
                    {
                        collectionId = onLineCluster.GetActiveCollectionId();
                    }
                    else
                    {
                        if (!onLineCluster.IsClusterCollectionCountValidate())
                        {
                            isToSwitch = true;
                            result = false;
                            return;
                        }

                        if (clusterMaster.isDisposed)
                        {
                            result = false;
                            return;
                        }

                        collectionId = GetNewCollectionId();
                        if (clusterMaster.isDisposed || collectionId == ClusterContext.InvalidateId)
                        {
                            result = false;
                            return;
                        }
                        onLineCluster.SetActiveCollection(collectionId);
                        if (clusterMaster.isDisposed)
                        {
                            result = false;
                            return;
                        }
                        //Util.LogInfo("MultiSen dAssignOnlineCollection:" + string.Join(" ", onLineCluster.GetClusterNodeIds()) + ", " + onLineCluster.Id + "," + collectionId);
                        bool isOk = MultiSendAssignOnlineCollection(onLineCluster.GetClusterNodeIds(), onLineCluster.Id, collectionId);
                        if (!isOk)
                        {
                            result = false;
                            onLineCluster.SetActiveCollection(ClusterContext.InvalidateId);
                            SetCollectionProcessed(collectionId);
                            return;
                        }
                        if (!onLineCluster.CanDeliverMsg())
                        {
                            result = false;
                            onLineCluster.SetActiveCollection(ClusterContext.InvalidateId);
                            SetCollectionProcessed(collectionId);
                            return;
                        }
                    }

                    if (clusterMaster.isDisposed)
                    {
                        result = false;
                        return;
                    }
                    //Util.LogInfo("MultiSendMessage:" + string.Join(" ", onLineCluster.GetClusterNodeIds()));
                    result = MultiSendMessage(onLineCluster.GetClusterNodeIds(), collectionId, message);
                    return;
                });
            };
            if (onLineCluster != null)
            {
                sendOnCurrentCluster();
            }
            else
            {
                for (int i1 = 0; i1 < clusterList.Count; i1++)
                {
                    onLineCluster = GetNextOnLineCluster(clusterList);
                    break;
                }
                result = false;
                if (onLineCluster != null)
                {
                    isToSwitch = true;
                }
            }

            if (!result && isToSwitch)
            {
                onLineCluster = SwitchClusterWhenFull(onLineCluster);
                if (onLineCluster == null)
                {
                    return CallServiceResult<bool>.GetError(4, "Send message failed!");
                }
                isToSwitch = false;
                result = false;
                sendOnCurrentCluster();
            }
            if (result)
            {
                return CallServiceResult<bool>.GetSuccess(true);
            }
            else
            {
                return CallServiceResult<bool>.GetError(3, "Send message failed!");
            }
        }
		
        public bool MultiSendMessage(List<short> clusterNodes, int collectionId, TopicMessage message)
        {
            var request = new SendMessageRequest { 
                topicEnum = topicEnum,
                collectionId = collectionId, message = message };
            bool hasError = false;
            Parallel.ForEach(clusterNodes, (nodeId) => {
                DataNodeProxy node = null;
                if (!clusterMaster.dataNodeProxyDict.TryGetValue(nodeId, out node))
                {
                    hasError = true;
                    return;
                }

                if (hasError)
                {
                    return;
                }
                var callResult = node.SendMessage(request);
                if (!callResult.IsSuccess())
                {
                    hasError = true;
                    return;
                }
                bool isOk = callResult.Data;
                if (!isOk)
                {
                    hasError = true;
                    return;
                }
            });
            return !hasError;
        }

        internal void DoDisposeNodeOnInstantProcessCollections(short nodeId)
        {
            var onLineClusterList = onLineTopicClusterDict.Values.ToList();
            foreach (var cluster in onLineClusterList)
            {
                cluster.DisposeNodeOnInstantProcess(nodeId);
            }
        }

		public void OnInstantCollectionFinished(byte consumerEnum, bool isSuccessed, int clusterId, int collectionId)
        {
            var context = ClusterContext.Get();
            TopicCluster cluster = null;
            if (onLineTopicClusterDict.TryGetValue(clusterId, out cluster))
            {
                //Util.LogInfo("OnInstant CollectionFinished cluster found in active");
            }
            else
            {
               //Util.LogInfo("OnInstant CollectionFinished cluster can't found in active");
            }
            if (isSuccessed)
            {
                SetCollectionProcessed(consumerEnum, collectionId);
            }
            else
            { 
                
            }
        }

        internal void SetCollectionProcessed(int collectionId)
        {
            foreach(var consumerEnum in allConsumerEnums)
            {
                if (clusterMaster.isDisposed)
                {
                    return;
                }
                consumerClusterMasterProvider.GetInstance(consumerEnum).MergeNode(collectionId);
            }
            MultiSendSyncProcessedTree();
        }

        internal void SetCollectionProcessed(byte consumerEnum, int collectionId)
        {
            consumerClusterMasterProvider.GetInstance(consumerEnum).MergeNode(collectionId);
            MultiSendSyncProcessedTree(consumerEnum, new List<int> { collectionId });
        }

        internal TopicCluster SwitchClusterWhenFull(TopicCluster oldCluster)
        {
            DisposeClusterOnlineCollections(oldCluster);
            if (clusterMaster.isDisposed)
            {
                return null;
            }
            int clusterId = GetNewClusterId();
            if (clusterMaster.isDisposed || clusterId == ClusterContext.InvalidateId)
            {
                return null;
            }
            TopicCluster newCluster = oldCluster.GetCloneCluster(clusterId);
            MultiSendAssignOnLineClusterNodes(newCluster.GetClusterNodeIds(), newCluster.Id);
            if (clusterMaster.isDisposed)
            {
                return null;
            }
            onLineTopicClusterDict.TryAdd(newCluster.Id, newCluster);
            return newCluster;
        }	

		public void MockDisposeAllCluster()
        { 
            foreach (var cluster in onLineTopicClusterDict.Values.ToList())
            {
                DisposeClusterOnlineCollections(cluster);
            }
        }		
    }
}
